Skip to content

✨ SQS server#233

Open
aleksul wants to merge 6 commits intomainfrom
sqs
Open

✨ SQS server#233
aleksul wants to merge 6 commits intomainfrom
sqs

Conversation

@aleksul
Copy link
Copy Markdown
Owner

@aleksul aleksul commented Apr 13, 2026

No description provided.

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages bot commented Apr 13, 2026

Deploying repid with  Cloudflare Pages  Cloudflare Pages

Latest commit: a53c6db
Status: ✅  Deploy successful!
Preview URL: https://7efa2563.repid.pages.dev
Branch Preview URL: https://sqs.repid.pages.dev

View logs

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 13, 2026

Coverage Report

Name Stmts Miss Cover Missing
TOTAL 7781 0 100%

102 files skipped due to complete coverage.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an Amazon SQS transport to the repid.connections layer (backed by aiobotocore) along with ElasticMQ-powered integration coverage, enabling SQS to be used as a first-class Repid server.

Changes:

  • Introduces SqsServer, SqsSubscriber, and SqsReceivedMessage implementations using aiobotocore.
  • Adds an ElasticMQ docker fixture and extensive SQS-specific integration/unit-style tests.
  • Exposes SqsServer via repid.connections when aiobotocore is installed and adds sqs/typing dependencies in pyproject.toml.

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
repid/connections/sqs/message_broker.py Implements SqsServer lifecycle, publish, and subscribe using aiobotocore.
repid/connections/sqs/subscriber.py Implements background consumption, concurrency limiting, and pause/resume/close semantics for SQS.
repid/connections/sqs/message.py Implements ReceivedMessageT actions (ack/nack/reject/reply) for SQS messages.
repid/connections/sqs/__init__.py Exports SqsServer from the SQS subpackage.
repid/connections/__init__.py Conditionally exposes SqsServer when aiobotocore is available.
tests/integration/conftest.py Adds an ElasticMQ container + sqs_connection fixture and includes it in autoconn parametrization.
tests/integration/test_sqs_specific.py Adds a large SQS-specific test suite covering server/message/subscriber behaviors and edge cases.
pyproject.toml Adds sqs extra (aiobotocore) and typing dependency (types-aiobotocore[sqs]).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@aleksul
Copy link
Copy Markdown
Owner Author

aleksul commented Apr 13, 2026

@copilot apply changes based on the comments in this thread

Copilot AI and others added 2 commits April 13, 2026 23:28
Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 13, 2026

@copilot apply changes based on the comments in this thread

Applied all changes from the review thread (commits c15ea3d and c1aa69f):

  • subscriber.py: Added concurrency_limit > 0 validation with a clear ValueError; pause() now awaits cancelled _tasks via asyncio.gather before clearing.
  • message.py: Skips None StringValue entries; excludes _repid_encoding from user headers; decodes body from base64 when _repid_encoding == base64; reply() now uses base64 encoding and has consistent ClientError handling for delete_message() matching ack()/nack()/reject().
  • message_broker.py: _client_cm initialized in __init__; "thinking out loud" comments removed from connect(); subscribe() adds a done callback to auto-remove from _active_subscribers; disconnect() clears _active_subscribers in a finally block; publish() now uses base64 encoding for arbitrary-bytes safety.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 9 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@aleksul
Copy link
Copy Markdown
Owner Author

aleksul commented Apr 14, 2026

@copilot apply changes based on the comments in this thread

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 14, 2026

@copilot apply changes based on the comments in this thread

Applied all changes from the thread in commit d2b1e13:

  • base64.b64decode now uses validate=True
  • _repid_encoding attribute removed from publish() and reply() — always base64 encode/decode without a custom attribute
  • Test test_sqs_message_reject_suppresses_exceptions: removed suppress(Exception), added assertions
  • Test test_sqs_message_reply_with_server_params: removed suppress(Exception), added action == MessageAction.replied assertion
  • Test test_sqs_message_nack_without_dlq: removed suppress(Exception)

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 9 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +195 to +198
body_str = base64.b64encode(message.payload).decode("ascii")
if not body_str:
body_str = " "

Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publish() uses a single-space MessageBody sentinel when the payload is empty, but SqsReceivedMessage base64-decodes with validate=True and falls back to UTF-8 bytes on failure. This makes an empty payload round-trip as b" " instead of b"". Consider treating the sentinel body (e.g., exactly " ") as an empty payload when parsing, or use a different encoding strategy that preserves empty bytes while satisfying SQS' non-empty body requirement.

Copilot uses AI. Check for mistakes.
Comment on lines +49 to +52
try:
self._payload = base64.b64decode(body, validate=True)
except (binascii.Error, ValueError):
self._payload = str(body).encode("utf-8", errors="replace")
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SqsReceivedMessage currently attempts strict base64 decode and on failure returns the UTF-8 bytes of the raw body. Since SqsServer.publish()/reply() use a single space as a sentinel for empty payloads, this code will decode that sentinel into b" " instead of an empty payload. Special-case the sentinel body (exactly " ") to produce b"" before attempting base64 decode.

Suggested change
try:
self._payload = base64.b64decode(body, validate=True)
except (binascii.Error, ValueError):
self._payload = str(body).encode("utf-8", errors="replace")
if body == " ":
self._payload = b""
else:
try:
self._payload = base64.b64decode(body, validate=True)
except (binascii.Error, ValueError):
self._payload = str(body).encode("utf-8", errors="replace")

Copilot uses AI. Check for mistakes.

# Suppress exceptions from the detached reject task
def _ignore_exception(task: asyncio.Task) -> None:
with contextlib.suppress(Exception):
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the detached reject() task created during cancellation, _ignore_exception only suppresses Exception. If the reject task itself is cancelled, task.result() raises asyncio.CancelledError (a BaseException), which can surface as an unhandled exception in the event loop's done-callback handling. Suppress asyncio.CancelledError here as well (as is already done in _consume_channel).

Suggested change
with contextlib.suppress(Exception):
with contextlib.suppress(asyncio.CancelledError, Exception):

Copilot uses AI. Check for mistakes.
Comment on lines +577 to +584
task = asyncio.create_task(sub._consume_channel("default"))
await asyncio.sleep(0.1)
task.cancel()
await asyncio.sleep(0.1)

server._client = original_client
await sub.close()

Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test cancels the background task but never awaits it. That can lead to "Task was destroyed but it is pending" warnings (or failures under stricter asyncio settings). After task.cancel(), await the task under contextlib.suppress(asyncio.CancelledError) (or ensure it finishes) before ending the test.

Copilot uses AI. Check for mistakes.
@aleksul
Copy link
Copy Markdown
Owner Author

aleksul commented Apr 14, 2026

@copilot apply changes based on the comments in this thread

…ed task in test

Agent-Logs-Url: https://github.com/aleksul/repid/sessions/61c3fe0b-b86f-4769-abd9-62baab78188b

Co-authored-by: aleksul <63556725+aleksul@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 14, 2026

@copilot apply changes based on the comments in this thread

Applied all changes from the thread in commit a53c6db:

  • message.py: Added sentinel check if body == " ": self._payload = b"" so empty payloads round-trip as b"" instead of b" "
  • subscriber.py: _ignore_exception now suppresses asyncio.CancelledError in addition to Exception (needed on Python 3.10+ where CancelledError is a BaseException, not Exception)
  • Test: replaced await asyncio.sleep(0.1) after task.cancel() with suppress(asyncio.CancelledError): await task to properly await the cancelled task

@aleksul aleksul requested a review from Copilot April 14, 2026 14:14
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 9 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +44 to +52
def is_active(self) -> bool:
return self._active

@property
def task(self) -> asyncio.Task:
if self._main_task is None:
raise RuntimeError("Subscriber is not active.")
return self._main_task

Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_active is driven solely by the _active flag, but _active is never reset when the main consume task finishes on its own (e.g., when channels_to_callbacks is empty and _consume() returns immediately). This can leave a subscriber reporting active even though its task is already done, and resume() won’t restart it. Consider defining is_active based on both _active and _main_task state (e.g., _main_task is not None and not _main_task.done()), and/or ensuring _consume() sets _active = False on exit.

Copilot uses AI. Check for mistakes.
MessageAttributeNames=["All"],
)

messages = response.get("Messages", [])
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When receive_wait_time_seconds is configured as 0 (used in tests and potentially by users), an empty receive response results in a tight loop that repeatedly calls receive_message() with no backoff. This can cause unnecessary CPU usage and request spam. Consider adding a small sleep/yield when messages is empty and wait time is 0 (or enforcing a minimum wait time) to prevent busy looping.

Suggested change
messages = response.get("Messages", [])
messages = response.get("Messages", [])
if not messages and self._server._receive_wait_time_seconds <= 0:
await asyncio.sleep(0.1)
continue

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +3
from repid.connections.sqs.message_broker import SqsServer

__all__ = ["SqsServer"]
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repid/connections/sqs/__init__.py uses a plain from ... import SqsServer plus a string-based __all__. In this codebase, other connection subpackages re-export with from ... import <Name> as <Name> (e.g., repid/connections/redis/__init__.py:1-2, repid/connections/nats/__init__.py:1) which avoids unused-import linting and keeps the pattern consistent. Consider switching to from repid.connections.sqs.message_broker import SqsServer as SqsServer and dropping __all__ to match the established convention.

Suggested change
from repid.connections.sqs.message_broker import SqsServer
__all__ = ["SqsServer"]
from repid.connections.sqs.message_broker import SqsServer as SqsServer

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants